Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): additional advertised listeners/broker addresses for kafka #2920

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

ghthor
Copy link

@ghthor ghthor commented Dec 15, 2024

What does this PR do?

  1. Add additional kafka listeners to support more networking configurations
  2. Add additional kafka advertised listeners for the above
  3. Add new methods for retrieving brokers based on different hostnames

Why is it important?

  1. kafka likes to know the exact address where it can be reached
  2. the current ad. listeners returns localhost:<random port>
  3. localhost is not a valid address to kafka when running within another docker container

This change is intended to support more complicated testing setups where the test code is running itself is another docker container, thus needs kafka to advertise a connection string other than localhost.

Related issues

  1. Alternate implementation to fix(kafka): Fix internal docker connection #2894
  2. supersedes Fix kafka internal docker connection #2490

How to test this PR

  1. Unit tests adding in PR
  2. Examples added in PR

@ghthor ghthor requested a review from a team as a code owner December 15, 2024 14:39
@ghthor ghthor marked this pull request as draft December 15, 2024 14:39
Copy link

netlify bot commented Dec 15, 2024

Deploy Preview for testcontainers-go ready!

Name Link
🔨 Latest commit 2203abe
🔍 Latest deploy log https://app.netlify.com/sites/testcontainers-go/deploys/6764a58194b5310008d31958
😎 Deploy Preview https://deploy-preview-2920--testcontainers-go.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@ghthor ghthor force-pushed the kafka-additional-broker-addresses branch 6 times, most recently from 28c77cc to 1ff166c Compare December 15, 2024 17:37
@ghthor ghthor marked this pull request as ready for review December 15, 2024 17:42
@ghthor ghthor force-pushed the kafka-additional-broker-addresses branch from 1ff166c to 8cffdbf Compare December 15, 2024 18:08
@ghthor ghthor changed the title feat: additional advertised listners/broker addresses for kafka feat(kafka): additional advertised listners/broker addresses for kafka Dec 15, 2024
@ghthor ghthor changed the title feat(kafka): additional advertised listners/broker addresses for kafka feat(kafka): additional advertised listeners/broker addresses for kafka Dec 15, 2024
@mdelapenya
Copy link
Member

Thanks @ghthor for your work here, I saw you linked to #2894, which is providing support for dynamically adding listeners to kafka. IIUC this PR is adding fixed listeners to it. What are the implications of having one or the other worlds for the kafka module here? In other words: if we merge this one, do we need the other one?

@eddumelendez could you take a look at this PR too?

@ghthor
Copy link
Author

ghthor commented Dec 16, 2024

@mdelapenya I think that most of the usecases for dynamic listeners would be provided by this one. The case that wouldn't be covered by this PR is if you need listeners that are a different protocol other than PLAINTEXT, IE TLS or whatever else kafka supports.

There is no reason that adding more default fixed listeners could not co-exist with the feature to add dynamic listeners. I don't know that I've found a reason to use dynamic listeners yet, and they add many elements of complication, as to declare them you need to know much information upfront or configure more stuff to make them work (hostnames / ports) etc.

The aim of this PR is to avoid as much boilerplate as possible to achieve Container <> Container support. I gave the user 2 options to achieve this aim.

  1. Configure the non-kafka containers with an extra hostname host.docker.internal and use BrokersByHostDockerInternal
  2. Configure both kafka and non-kafka containers to run in a new docker network so container name/id name resolution works and use BrokersByContainer{Name,Id}

You do either of those and you get Container <> Container kafka working AND we can assert all this is working within the kafka module tests because we're not dealing with user configurable listeners.

@mdelapenya
Copy link
Member

I don't know that I've found a reason to use dynamic listeners yet, and they add many elements of complication, as to declare them you need to know much information upfront or configure more stuff to make them work (hostnames / ports) etc.

A fan of this! Thanks for the explanation

We'll start reviewing this PR today, and will let @eddumelendez take a look for more kafka-specific questions (I'm totally illiterate on it)

@ghthor
Copy link
Author

ghthor commented Dec 16, 2024

Will push up a fix for the lint failures. 🏁

@ghthor ghthor force-pushed the kafka-additional-broker-addresses branch from 8cffdbf to bb67aff Compare December 16, 2024 14:16
@ghthor
Copy link
Author

ghthor commented Dec 17, 2024

@mdelapenya can you authorize another run of the CI test suite?

@mdelapenya
Copy link
Member

mdelapenya commented Dec 17, 2024

@ghthor could you run make lint from the kafka directory? It will give you more context about the lint error.

  modules/kafka/examples_test.go:12: File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/testcontainers) (gci)
  	"github.com/docker/docker/api/types/container"
  Error: modules/kafka/examples_test.go:73:4: exitAfterDefer: log.Fatalf will exit, and `defer func(){...}(...)` will not run (gocritic)
  			log.Fatalf("failed to get container state: %s", err)
  			^
  Error: modules/kafka/examples_test.go:182:4: exitAfterDefer: log.Fatalf will exit, and `defer func(){...}(...)` will not run (gocritic)
  			log.Fatalf("failed to get container state: %s", err)
  			^
  Error: modules/kafka/examples_test.go:284:4: exitAfterDefer: log.Fatalf will exit, and `defer func(){...}(...)` will not run (gocritic)
  			log.Fatalf("failed to get container state: %s", err)
  			^
  modules/kafka/kafka.go:66: File is not `gofumpt`-ed (gofumpt)

@ghthor ghthor force-pushed the kafka-additional-broker-addresses branch from bb67aff to 7395ce5 Compare December 17, 2024 16:04
@ghthor
Copy link
Author

ghthor commented Dec 18, 2024

@mdelapenya Done

@mdelapenya mdelapenya self-assigned this Dec 18, 2024
@mdelapenya mdelapenya added the feature New functionality or new behaviors on the existing one label Dec 18, 2024
Copy link
Collaborator

@stevenh stevenh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, I've done an initial pass on this, theres a few bugs, suggestions and questions.

Comment on lines 98 to 99
string slice, containing the hostname `host.docker.internal` and a random port
defined by Kafka's public port (`19092/tcp`).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: clarify what the 19092 port is, it's not clear on reading if its an example of random port or is that re-enforcing the standard port

@@ -19,7 +27,7 @@ func ExampleRun() {
)
defer func() {
if err := testcontainers.TerminateContainer(kafkaContainer); err != nil {
log.Printf("failed to terminate container: %s", err)
log.Fatalf("failed to terminate container: %s", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug: this breaks clean up, please revert.

// Clean up the container after
defer func() {
if err := kafkaContainer.Terminate(ctx); err != nil {
log.Fatalf("failed to terminate container: %s", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug: this should be Printf

}
}()

{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: no need for wrapping braces which make this harder to read, more below.

network.

<!--codeinclude-->
[Start Kafka inside a docker network](../../modules/kafka/examples_test.go) inside_block:getBrokersByContainerName_Kafka
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug: looks like wrong name here says by docker network and then links to by container.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct, It's a 2 part explanation, I was trying to hightlight that this works best (maybe ONLY works) if you start the Kafka testcontainer inside a non-default docker network. So I broke this into 2 codeincludes, one focusing on starting the Kafka test container in a non-default network and the second part of starting another container in that same network.

Comment on lines 35 to 39
// Clean up the container after the test is complete
t.Cleanup(func() {
require.NoError(t, kafkaContainer.Terminate(ctx), "failed to terminate container: %v", err)
})

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug: this isn't needed as that's what CleanContainer does.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent 🍾

modules/kafka/kafka_test.go Show resolved Hide resolved
require.Truef(t, strings.EqualFold(string(consumer.message.Value), "value"), "expected value to be %s, got %s", "value", string(consumer.message.Value))
require.Truef(t, strings.EqualFold(string(consumer.message.Value), value), "expected value to be %s, got %s", value, string(consumer.message.Value))

assertBrokers := func(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: move to separate method to avoid the function getting so large.

Comment on lines 111 to 113
t.Cleanup(func() {
require.NoError(t, kcat.Terminate(ctx), "failed to terminate container")
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug: use CleanupContainer before the error check.

Comment on lines +170 to +203
assert := func(listener string) {
t.Helper()
require.Containsf(t, bs, listener, "expected advertised listeners to contain %s, got %s", listener, bs)
}

mustBrokers := func(fn func(context.Context) ([]string, error)) string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: move to helper functions to avoid function bloat.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Felt like this one benefited from being a internal closure, it's short, and requires to many parameters when lifted up to a function.

@mdelapenya
Copy link
Member

@eddumelendez what are your thoughts on this PR?

@ghthor ghthor force-pushed the kafka-additional-broker-addresses branch from bcffa98 to 7874ca9 Compare December 19, 2024 22:51
@ghthor ghthor force-pushed the kafka-additional-broker-addresses branch from 7874ca9 to 2203abe Compare December 19, 2024 23:00
Comment on lines +89 to +132
t.Run("BrokersByHostDockerInternal", func(t *testing.T) {
brokers, err := kafkaContainer.BrokersByHostDockerInternal(ctx)
require.NoError(t, err)

kcat, err := runKcatContainer(ctx, brokers, func(hc *container.HostConfig) {
hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway")
}, nil)
testcontainers.CleanupContainer(t, kcat)
require.NoError(t, err)

l, err := kcat.Logs(ctx)
require.NoError(t, err)
defer l.Close()

assertKcatReadMsg(t, l)
})
t.Run("BrokersByContainerId", func(t *testing.T) {
brokers, err := kafkaContainer.BrokersByContainerId(ctx)
require.NoError(t, err)

kcat, err := runKcatContainer(ctx, brokers, nil, []string{net.Name})
testcontainers.CleanupContainer(t, kcat)
require.NoError(t, err)

l, err := kcat.Logs(ctx)
require.NoError(t, err)
defer l.Close()

assertKcatReadMsg(t, l)
})
t.Run("BrokersByContainerName", func(t *testing.T) {
brokers, err := kafkaContainer.BrokersByContainerName(ctx)
require.NoError(t, err)

kcat, err := runKcatContainer(ctx, brokers, nil, []string{net.Name})
testcontainers.CleanupContainer(t, kcat)
require.NoError(t, err)

l, err := kcat.Logs(ctx)
require.NoError(t, err)
defer l.Close()

assertKcatReadMsg(t, l)
})
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenh Much happier with this after your suggestion to not use that giant closure here 🫡

Copy link
Member

@eddumelendez eddumelendez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposed idea is good; however, I lean towards prioritizing user flexibility. Introducing these changes as defaults could potentially break existing users who have already defined their own listeners.

Additionally, there is a specific use case that this approach does not address: scenarios where tools like Toxiproxy are used in front of Kafka to simulate network conditions and test resiliency. This highlights the importance of maintaining configurability to accommodate diverse testing and deployment needs.

Comment on lines +192 to +193
fmt.Sprintf("LOCALHOST://%s:%d", host, portLh.Int()),
fmt.Sprintf("HOST_DOCKER_INTERNAL://%s:%d", "host.docker.internal", portDh.Int()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both have the same purpose. and LOCALHOST is more generic for CI and local usage. IMO, we just need to keep the first one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't use a broker address of localhost from another docker container, that's the entire point of this proposed addition.

#2920 (comment)

@ghthor ghthor requested a review from stevenh December 20, 2024 14:14
@ghthor
Copy link
Author

ghthor commented Dec 20, 2024

The proposed idea is good; however, I lean towards prioritizing user flexibility. Introducing these changes as defaults could potentially break existing users who have already defined their own listeners.

Do you have an example of how to do that, as this change is proposed because that's not really possible to do.

Additionally, there is a specific use case that this approach does not address: scenarios where tools like Toxiproxy are used in front of Kafka to simulate network conditions and test resiliency.

Do you have an example of what this looks like. I don't understand how adding a few more listeners and advertised listeners would break this.

@eddumelendez

@ghthor
Copy link
Author

ghthor commented Dec 20, 2024

I could see what it looks like to put all this behind functional options. It would differ from the other PR in that we're not going to allow you to configure any of them, just whether they are enabled or not.

This doesn't have a benefit that we could assert some things and produce errors if the API isn't used correctly.

Example being, if you want to use the BrokersByContainer{Name,Id} API you need to run the testcontainer in a docker network, not the default one. So if you enable that and don't pass in a network then we can produce a helpful error.

Using the BrokersByHostDockerInternal couldn't be verifed in the same way, as that requires configuring the OTHER container, But the other 2 methods could verify that; and actually we should probably produce an error for those 2 now cause we know whether kafka was started in the default network or not.

@eddumelendez
Copy link
Member

I have reviewed various use cases and examined bugs reported in other language implementations. Below is a summary of the key requirements we should address in the API:

  1. Avoid Using host.docker.internal as a default
    Using host.docker.internal as a default can create confusion with the equivalent feature provided by Docker Desktop (reference). Moreover, it required additional configuration in the companion container, making it less convenient for users.

  2. Flexible listener Configuration
    Listeners can be configured using networking to be accessed via container name or container ID, as demonstrated in this PR. However, some users may prefer to utilize the container’s IP address. The API should accommodate these varied preferences, providing users with flexibility in how they configure and use it.

  3. Proxy Configuration for Kafka Broker
    As previously mentioned, a proxy can be configured on top of the Kafka broker. For this to function effectively, the proxy will need to register the network alias or the container IP to facilitate communication between containers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New functionality or new behaviors on the existing one
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants